[新機能]Kinesis Streamsがシャード単位のメトリクスに対応しました
2016/04/19のアップデートにより、Kinesisストリームはシャード単位のメトリクス(shard level metrics)に対応しました。
従来はストリーム単位のメトリクスにしか対応していませんでしたが、今回の機能追加により
- シャード間のレコードの偏り
- データ処理の止まっているシャード
- データ処理遅延の発生しているシャード
などを簡単に検出できるようになりました。
機能概要
シャード単位メトリクスを有効にするには?
シャード単位のメトリクスは RDS にもあるEnhanced Monitoring(拡張モニタリング)の形で提供され、標準では無効化されています。また、メトリクス単位で有効・無効を制御できます。
新規に提供されるメトリクス一覧
CloudWatchのKinesisストリームのストリーム・シャードそれぞれのメトリクスを確認します。
シャード単位のメトリクスが今回追加されたメトリクスです。
説明 | Stream Level | Shard Level |
取得したバイト数 | GetRecords.Bytes | OutgoingBytes |
レコード追加されてから、処理されるまでのラグ。 Deprecated | GetRecords.IteratorAge | N/A |
レコード追加されてから、処理されるまでのラグ。 単位はミリ秒 | GetRecords.IteratorAgeMilliseconds | IteratorAgeMilliseconds |
GetRecords APIの処理時間 | GetRecords.Latency | N/A |
GetRecords APIでストリームから取得したレコード数 | GetRecords.Records | OutgoingRecords |
GetRecords APIの成功数 | GetRecords.Success | N/A |
追加したバイト数
APIの合計 |
IncomingBytes | IncomingBytes |
追加したレコード数
APIの合計 |
IncomingRecords | IncomingRecords |
PutRecord APIでストリームに追加したバイト数 | PutRecord.Bytes | N/A |
PutRecord APIの処理時間 | PutRecord.Latency | N/A |
PutRecord APIの成功数 | PutRecord.Success | N/A |
PutRecords APIでストリームに追加したバイト数 | PutRecords.Bytes | N/A |
PutRecords APIでストリームに追加したバイト数 | PutRecords.Latency | N/A |
PutRecords APIの処理時間 | PutRecords.Records | N/A |
PutRecords APIの成功数 | PutRecords.Success | N/A |
レコード取得でスロットリングされたAPI呼び出し数 | ReadProvisionedThroughputExceeded | ReadProvisionedThroughputExceeded |
レコード追加でスロットリングされたAPI呼び出し数 | WriteProvisionedThroughputExceeded | WriteProvisionedThroughputExceeded |
メトリクスの見方
シャード間のレコードの偏っていないか確認するには、IncomingRecordsのSUMを確認します。
データ処理の止まっているシャードやデータ処理遅延の発生しているシャードを確認するには、IteratorAgeMillisecondsのAVERAGEを確認します。
メトリクスの詳細は次のURLから確認できます。
http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html
CloudWatchのグラフ
CloudWatchの管理画面からストリーム「foo」のシャード単位のIncomingRecordsメトリクスをグラフ化したのが以下です。
ストリーム「foo」を構成する「0番」と「1番」のシャードそれぞれのメトリクスを取得出来ていますね。
2シャード間でレコード数はほぼ同じで偏りがないことがわかります。
Kinesis シャード単位メトリクスの操作例
Kinesisストリームの拡張モニタリング機能はAPI経由でしか操作できません。 以下では AWS CLI からシャード単位メトリクスを操作します。
準備
まずは準備としてKinesisストリームを作成します。
$ aws --version aws-cli/1.10.25 Python/2.7.10 Linux/4.4.8-20.46.amzn1.x86_64 botocore/1.4.16 # Create Kinesis Stream $ aws kinesis create-stream --stream-name bar --shard-count 2 $ aws kinesis wait stream-exists --stream-name bar aws kinesis wait stream-exists --stream-name bar # Describe Kinesis Stream $ aws kinesis describe-stream --stream-name bar { "StreamDescription": { "RetentionPeriodHours": 24, "StreamName": "bar", "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "EndingHashKey": "170141183460469231731687303715884105727", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49561278658050145864612465902958334562332714288264773634" } }, { "ShardId": "shardId-000000000001", "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "170141183460469231731687303715884105728" }, "SequenceNumberRange": { "StartingSequenceNumber": "49561278658072446609810996526099870280605362649770754066" } } ], "StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/bar", "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "StreamStatus": "ACTIVE" } }
ストリームの describe
結果に EnhancedMonitoring
ブロックが追加されていますね。
シャード単位メトリクスを有効にする
シャード単位メトリクスは拡張モニタリングのため、標準では無効化されています。
enable-enchanced-monitoring
API で有効化します。
メトリクス単位で有効・無効を制御出来るため、有効にするメトリクスを--shard-level-metrics
で指定します。
# Enable Enchanced Monitoring $ aws kinesis enable-enhanced-monitoring \ --stream-name bar \ --shard-level-metrics IncomingBytes IncomingRecords { "StreamName": "bar", "CurrentShardLevelMetrics": [], "DesiredShardLevelMetrics": [ "IncomingBytes", "IncomingRecords" ] } $ aws kinesis wait stream-exists --stream-name bar # Describe Kinesis Stream $ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring [ { "ShardLevelMetrics": [ "IncomingBytes", "IncomingRecords" ] } ]
enable-enchanced-monitoring
API のレスポンスには
- 変更前に有効なメトリクス(
CurrentShardLevelMetrics
) - 変更後に有効なメトリクス(
DesiredShardLevelMetrics
)
がかえってきます。
すべてのシャード単位メトリクスを有効にする
有効にするメトリクスを逐一指定するのは手間ですね。
--shard-level-metrics ALL
とすると、全てのシャード単位メトリクスが有効になります。
$ aws kinesis enable-enhanced-monitoring \ --stream-name bar \ --shard-level-metrics ALL { "StreamName": "bar", "CurrentShardLevelMetrics": [ "IncomingBytes", "IncomingRecords" ], "DesiredShardLevelMetrics": [ "IncomingBytes", "OutgoingRecords", "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ] } $ aws kinesis wait stream-exists --stream-name bar $ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring [ { "ShardLevelMetrics": [ "IncomingBytes", "OutgoingRecords", "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ] } ]
シャード単位メトリクスを削減する
取得するメトリクスを削減したい場合、削除するメトリクスをdisable-enchanced-monitoring
API で無効化します。
$ aws kinesis disable-enhanced-monitoring \ --stream-name bar \ --shard-level-metrics IncomingBytes OutgoingRecords { "StreamName": "bar", "CurrentShardLevelMetrics": [ "IncomingBytes", "OutgoingRecords", "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ], "DesiredShardLevelMetrics": [ "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ] } $ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring [ { "ShardLevelMetrics": [ "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ] } ]
enable-enchanced-monitoring
API と同じく、disable-enchanced-monitoring
API のレスポンスには
- 変更前に有効なメトリクス(
CurrentShardLevelMetrics
) - 変更後に有効なメトリクス(
DesiredShardLevelMetrics
)
がかえってきます。
全部削除する
無効にするメトリクスを逐一指定するのは手間ですね。
--shard-level-metrics ALL
とすると、全てのシャード単位メトリクスが無効になります。
$ aws kinesis disable-enhanced-monitoring \ --stream-name bar \ --shard-level-metrics ALL { "StreamName": "bar", "CurrentShardLevelMetrics": [ "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "WriteProvisionedThroughputExceeded", "OutgoingBytes" ], "DesiredShardLevelMetrics": [] } $ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring [ { "ShardLevelMetrics": [] } ]
CloudWatch からメトリクスを取得する
get-metric-statistics
API で --dimensions
として
- ストリーム
- シャード
を指定します。
$ aws cloudwatch get-metric-statistics \ --namespace AWS/Kinesis \ --metric-name IncomingRecords \ --dimensions Name=ShardId,Value=shardId-000000000001 \ Name=StreamName,Value=foo \ --start-time 2016-04-20T21:00:00 \ --end-time 2016-04-20T22:00:00 \ --period 600 \ --statistics Sum { "Datapoints": [ { "Timestamp": "2016-04-20T21:30:00Z", "Sum": 550.0, "Unit": "Count" }, { "Timestamp": "2016-04-20T21:40:00Z", "Sum": 571.0, "Unit": "Count" }, { "Timestamp": "2016-04-20T21:10:00Z", "Sum": 577.0, "Unit": "Count" }, { "Timestamp": "2016-04-20T21:00:00Z", "Sum": 577.0, "Unit": "Count" }, { "Timestamp": "2016-04-20T21:50:00Z", "Sum": 572.0, "Unit": "Count" }, { "Timestamp": "2016-04-20T21:20:00Z", "Sum": 591.0, "Unit": "Count" } ], "Label": "IncomingRecords" }
注意点
APIからしか拡張モニタリングは有効に出来ない
Kinesisストリームの拡張モニタリング機能はAPI経由でしか操作できません。 Kinesisストリームはデータ保持期間の変更やシャードの分割・統合などもAPI経由でしか操作できないので、Kinesisに慣れている人からすれば「またか」という感じですが、管理画面からも変更出来るようにしてほしいものです。
更新中のストリームは変更できない
Kinesis ストリームの更新操作(拡張モニタリング、シャードの分割/統合など)は "StreamStatus" が "ACTIVE" なストリームに対してしか操作できません。
"StreamStatus" が "UPDATING" なストリームに更新操作を行うと、以下の様なエラーが発生します。
$ aws kinesis enable-enhanced-monitoring --stream-name bar --shard-level-metrics ALL A client error (ResourceInUseException) occurred when calling the EnableEnhancedMonitoring operation: Stream bar under account 123456789012 not ACTIVE, instead in state UPDATING
enable/disable-enchanced-monitoring APIによって取得するメトリクスに差分がない時
enable/disable-enchanced-monitoring
API 実行によって CurrentShardLevelMetrics と DesiredShardLevelMetrics に差分がない場合、Kinesis ストリームの更新処理は走らず、"StreamStatus" は "ACTIVE" なままです。
$ aws kinesis enable-enhanced-monitoring \ --stream-name bar \ --shard-level-metrics IteratorAgeMilliseconds { "StreamName": "bar", "CurrentShardLevelMetrics": [ "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "OutgoingBytes" ], "DesiredShardLevelMetrics": [ "IteratorAgeMilliseconds", "IncomingRecords", "ReadProvisionedThroughputExceeded", "OutgoingBytes" ] } $ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.StreamStatus "ACTIVE"
参考
- https://aws.amazon.com/jp/blogs/news/amazon-kinesis-update-amazon-elasticsearch-service-integration-shard-level-metrics-time-based-iterators/
- http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html
- http://docs.aws.amazon.com/kinesis/latest/APIReference/API_EnableEnhancedMonitoring.html
- http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DisableEnhancedMonitoring.html